{
  "metadata" : {
    "name" : "Using Spark SQL",
    "user_save_timestamp" : "1970-01-01T01:00:00.000Z",
    "auto_save_timestamp" : "1970-01-01T01:00:00.000Z",
    "language_info" : {
      "name" : "scala",
      "file_extension" : "scala",
      "codemirror_mode" : "text/x-scala"
    },
    "trusted" : true,
    "customLocalRepo" : null,
    "customRepos" : null,
    "customDeps" : null,
    "customImports" : null,
    "customArgs" : null,
    "customSparkConf" : null
  },
  "cells" : [ {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "# Use SQL to look at Stocks "
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "## Set up "
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "case class Quote(stock:String, dateString:String, price:Double) extends java.io.Serializable",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "def $(k:String)=sys.env(k)\nval hadoopConf = sparkContext.hadoopConfiguration;\nhadoopConf.set(\"fs.s3n.impl\", \"org.apache.hadoop.fs.s3native.NativeS3FileSystem\")\nhadoopConf.set(\"fs.s3.awsAccessKeyId\", $(\"AWS_ACCESS_KEY_ID\"))\nhadoopConf.set(\"fs.s3.awsSecretAccessKey\", $(\"AWS_SECRET_ACCESS_KEY\"))\nhadoopConf.set(\"fs.s3n.awsAccessKeyId\", $(\"AWS_ACCESS_KEY_ID\"))\nhadoopConf.set(\"fs.s3n.awsSecretAccessKey\", $(\"AWS_SECRET_ACCESS_KEY\"))\n()",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "## Read the data (S3 or local)"
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "The data we are going to use is a `170M` file of CSV lines containing the stock, the date then the price of a quote."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "val quotes = sparkContext//.textFile(\"/tmp/data/closes.csv\")\n                         .textFile(\"s3n://spark-notebook-data/closes.csv\")\n                         .map(_.split(\",\").toList)\n                         .map{ case s::d::p::Nil => Quote(s, d, p.toDouble) }\n                         .persist(org.apache.spark.storage.StorageLevel.DISK_ONLY) // cache!\nquotes.name = \"quotes\"",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "## Create a SQL context on the data "
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "//define sql context\nval sqlContext = new org.apache.spark.sql.SQLContext(sparkContext)\n\n// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.\nimport sqlContext.implicits._",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true
    },
    "cell_type" : "code",
    "source" : "//quotes.count",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Register the RDD as a SQL table."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "quotes.toDF().registerTempTable(\"quotes\")",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "### Define a form to play with the quotes "
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : ":sql[byDate] SELECT dateString, count(*) FROM quotes WHERE dateString >= '{String: date}' and price >= {Int: price} GROUP BY dateString",
    "outputs" : [ ]
  }, {
    "metadata" : { },
    "cell_type" : "markdown",
    "source" : "The following is for reactive printing."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : false
    },
    "cell_type" : "code",
    "source" : "byDate.react[String](\n  rddResult => rddResult.collect().toList.toString,// → change date to Long then keep count as Long\n  out//use wisp updated with result of the previous function\n)",
    "outputs" : [ ]
  } ],
  "nbformat" : 4
}